// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! A comparable row-oriented representation of a collection of [`Array`].
//!
//! [`Row`]s are [normalized for sorting], and can therefore be very efficiently [compared],
//! using [`memcmp`] under the hood, or used in [non-comparison sorts] such as [radix sort].
//! This makes the row format ideal for implementing efficient multi-column sorting,
//! grouping, aggregation, windowing and more, as described in more detail
//! [in this blog post](https://arrow.apache.org/blog/2022/11/07/multi-column-sorts-in-arrow-rust-part-1/).
//!
//! For example, given three input [`Array`], [`RowConverter`] creates byte
//! sequences that [compare] the same as when using [`lexsort`].
//!
//! ```text
//!    ┌─────┐   ┌─────┐   ┌─────┐
//!    │     │   │     │   │     │
//!    ├─────┤ ┌ ┼─────┼ ─ ┼─────┼ ┐              ┏━━━━━━━━━━━━━┓
//!    │     │   │     │   │     │  ─────────────▶┃             ┃
//!    ├─────┤ └ ┼─────┼ ─ ┼─────┼ ┘              ┗━━━━━━━━━━━━━┛
//!    │     │   │     │   │     │
//!    └─────┘   └─────┘   └─────┘
//!                ...
//!    ┌─────┐ ┌ ┬─────┬ ─ ┬─────┬ ┐              ┏━━━━━━━━┓
//!    │     │   │     │   │     │  ─────────────▶┃        ┃
//!    └─────┘ └ ┴─────┴ ─ ┴─────┴ ┘              ┗━━━━━━━━┛
//!     UInt64      Utf8     F64
//!
//!           Input Arrays                          Row Format
//!     (Columns)
//! ```
//!
//! _[`Rows`] must be generated by the same [`RowConverter`] for the comparison
//! to be meaningful._
//!
//! # Basic Example
//! ```
//! # use std::sync::Arc;
//! # use arrow_row::{RowConverter, SortField};
//! # use arrow_array::{ArrayRef, Int32Array, StringArray};
//! # use arrow_array::cast::{AsArray, as_string_array};
//! # use arrow_array::types::Int32Type;
//! # use arrow_schema::DataType;
//!
//! let a1 = Arc::new(Int32Array::from_iter_values([-1, -1, 0, 3, 3])) as ArrayRef;
//! let a2 = Arc::new(StringArray::from_iter_values(["a", "b", "c", "d", "d"])) as ArrayRef;
//! let arrays = vec![a1, a2];
//!
//! // Convert arrays to rows
//! let converter = RowConverter::new(vec![
//!     SortField::new(DataType::Int32),
//!     SortField::new(DataType::Utf8),
//! ]).unwrap();
//! let rows = converter.convert_columns(&arrays).unwrap();
//!
//! // Compare rows
//! for i in 0..4 {
//!     assert!(rows.row(i) <= rows.row(i + 1));
//! }
//! assert_eq!(rows.row(3), rows.row(4));
//!
//! // Convert rows back to arrays
//! let converted = converter.convert_rows(&rows).unwrap();
//! assert_eq!(arrays, converted);
//!
//! // Compare rows from different arrays
//! let a1 = Arc::new(Int32Array::from_iter_values([3, 4])) as ArrayRef;
//! let a2 = Arc::new(StringArray::from_iter_values(["e", "f"])) as ArrayRef;
//! let arrays = vec![a1, a2];
//! let rows2 = converter.convert_columns(&arrays).unwrap();
//!
//! assert!(rows.row(4) < rows2.row(0));
//! assert!(rows.row(4) < rows2.row(1));
//!
//! // Convert selection of rows back to arrays
//! let selection = [rows.row(0), rows2.row(1), rows.row(2), rows2.row(0)];
//! let converted = converter.convert_rows(selection).unwrap();
//! let c1 = converted[0].as_primitive::<Int32Type>();
//! assert_eq!(c1.values(), &[-1, 4, 0, 3]);
//!
//! let c2 = converted[1].as_string::<i32>();
//! let c2_values: Vec<_> = c2.iter().flatten().collect();
//! assert_eq!(&c2_values, &["a", "f", "c", "e"]);
//! ```
//!
//! # Lexsort
//!
//! The row format can also be used to implement a fast multi-column / lexicographic sort
//!
//! ```
//! # use arrow_row::{RowConverter, SortField};
//! # use arrow_array::{ArrayRef, UInt32Array};
//! fn lexsort_to_indices(arrays: &[ArrayRef]) -> UInt32Array {
//!     let fields = arrays
//!         .iter()
//!         .map(|a| SortField::new(a.data_type().clone()))
//!         .collect();
//!     let converter = RowConverter::new(fields).unwrap();
//!     let rows = converter.convert_columns(arrays).unwrap();
//!     let mut sort: Vec<_> = rows.iter().enumerate().collect();
//!     sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
//!     UInt32Array::from_iter_values(sort.iter().map(|(i, _)| *i as u32))
//! }
//! ```
//!
//! [non-comparison sorts]: https://en.wikipedia.org/wiki/Sorting_algorithm#Non-comparison_sorts
//! [radix sort]: https://en.wikipedia.org/wiki/Radix_sort
//! [normalized for sorting]: http://wwwlgis.informatik.uni-kl.de/archiv/wwwdvs.informatik.uni-kl.de/courses/DBSREAL/SS2005/Vorlesungsunterlagen/Implementing_Sorting.pdf
//! [`memcmp`]: https://www.man7.org/linux/man-pages/man3/memcmp.3.html
//! [`lexsort`]: https://docs.rs/arrow-ord/latest/arrow_ord/sort/fn.lexsort.html
//! [compared]: PartialOrd
//! [compare]: PartialOrd

use std::cmp::Ordering;
use std::hash::{Hash, Hasher};
use std::sync::Arc;

use arrow_array::cast::*;
use arrow_array::types::ArrowDictionaryKeyType;
use arrow_array::*;
use arrow_buffer::ArrowNativeType;
use arrow_data::ArrayDataBuilder;
use arrow_schema::*;
use variable::{decode_binary_view, decode_string_view};

use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
use crate::variable::{decode_binary, decode_string};

mod fixed;
mod list;
mod variable;

/// Converts [`ArrayRef`] columns into a [row-oriented](self) format.
///
/// *Note: The encoding of the row format may change from release to release.*
///
/// ## Overview
///
/// The row format is a variable length byte sequence created by
/// concatenating the encoded form of each column. The encoding for
/// each column depends on its datatype (and sort options).
///
/// The encoding is carefully designed in such a way that escaping is
/// unnecessary: it is never ambiguous as to whether a byte is part of
/// a sentinel (e.g. null) or a value.
///
/// ## Unsigned Integer Encoding
///
/// A null integer is encoded as a `0_u8`, followed by a zero-ed number of bytes corresponding
/// to the integer's length.
///
/// A valid integer is encoded as `1_u8`, followed by the big-endian representation of the
/// integer.
///
/// ```text
///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
///    3          │03│00│00│00│      │01│00│00│00│03│
///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
///   258         │02│01│00│00│      │01│00│00│01│02│
///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
///  23423        │7F│5B│00│00│      │01│00│00│5B│7F│
///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
///  NULL         │??│??│??│??│      │00│00│00│00│00│
///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
///
///              32-bit (4 bytes)        Row Format
///  Value        Little Endian
/// ```
///
/// ## Signed Integer Encoding
///
/// Signed integers have their most significant sign bit flipped, and are then encoded in the
/// same manner as an unsigned integer.
///
/// ```text
///        ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┬──┐
///     5  │05│00│00│00│       │05│00│00│80│       │01│80│00│00│05│
///        └──┴──┴──┴──┘       └──┴──┴──┴──┘       └──┴──┴──┴──┴──┘
///        ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┬──┐
///    -5  │FB│FF│FF│FF│       │FB│FF│FF│7F│       │01│7F│FF│FF│FB│
///        └──┴──┴──┴──┘       └──┴──┴──┴──┘       └──┴──┴──┴──┴──┘
///
///  Value  32-bit (4 bytes)    High bit flipped      Row Format
///          Little Endian
/// ```
///
/// ## Float Encoding
///
/// Floats are converted from IEEE 754 representation to a signed integer representation
/// by flipping all bar the sign bit if they are negative.
///
/// They are then encoded in the same manner as a signed integer.
///
/// ## Fixed Length Bytes Encoding
///
/// Fixed length bytes are encoded in the same fashion as primitive types above.
///
/// For a fixed length array of length `n`:
///
/// A null is encoded as `0_u8` null sentinel followed by `n` `0_u8` bytes
///
/// A valid value is encoded as `1_u8` followed by the value bytes
///
/// ## Variable Length Bytes (including Strings) Encoding
///
/// A null is encoded as a `0_u8`.
///
/// An empty byte array is encoded as `1_u8`.
///
/// A non-null, non-empty byte array is encoded as `2_u8` followed by the byte array
/// encoded using a block based scheme described below.
///
/// The byte array is broken up into fixed-width blocks, each block is written in turn
/// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes
/// with `0_u8` and written to the output, followed by the un-padded length in bytes
/// of this final block as a `u8`. The first 4 blocks have a length of 8, with subsequent
/// blocks using a length of 32, this is to reduce space amplification for small strings.
///
/// Note the following example encodings use a block size of 4 bytes for brevity:
///
/// ```text
///                       ┌───┬───┬───┬───┬───┬───┐
///  "MEEP"               │02 │'M'│'E'│'E'│'P'│04 │
///                       └───┴───┴───┴───┴───┴───┘
///
///                       ┌───┐
///  ""                   │01 |
///                       └───┘
///
///  NULL                 ┌───┐
///                       │00 │
///                       └───┘
///
/// "Defenestration"      ┌───┬───┬───┬───┬───┬───┐
///                       │02 │'D'│'e'│'f'│'e'│FF │
///                       └───┼───┼───┼───┼───┼───┤
///                           │'n'│'e'│'s'│'t'│FF │
///                           ├───┼───┼───┼───┼───┤
///                           │'r'│'a'│'t'│'r'│FF │
///                           ├───┼───┼───┼───┼───┤
///                           │'a'│'t'│'i'│'o'│FF │
///                           ├───┼───┼───┼───┼───┤
///                           │'n'│00 │00 │00 │01 │
///                           └───┴───┴───┴───┴───┘
/// ```
///
/// This approach is loosely inspired by [COBS] encoding, and chosen over more traditional
/// [byte stuffing] as it is more amenable to vectorisation, in particular AVX-256.
///
/// ## Dictionary Encoding
///
/// Dictionaries are hydrated to their underlying values
///
/// ## Struct Encoding
///
/// A null is encoded as a `0_u8`.
///
/// A valid value is encoded as `1_u8` followed by the row encoding of each child.
///
/// This encoding effectively flattens the schema in a depth-first fashion.
///
/// For example
///
/// ```text
/// ┌───────┬────────────────────────┬───────┐
/// │ Int32 │ Struct[Int32, Float32] │ Int32 │
/// └───────┴────────────────────────┴───────┘
/// ```
///
/// Is encoded as
///
/// ```text
/// ┌───────┬───────────────┬───────┬─────────┬───────┐
/// │ Int32 │ Null Sentinel │ Int32 │ Float32 │ Int32 │
/// └───────┴───────────────┴───────┴─────────┴───────┘
/// ```
///
/// ## List Encoding
///
/// Lists are encoded by first encoding all child elements to the row format.
///
/// A list value is then encoded as the concatenation of each of the child elements,
/// separately encoded using the variable length encoding described above, followed
/// by the variable length encoding of an empty byte array.
///
/// For example given:
///
/// ```text
/// [1_u8, 2_u8, 3_u8]
/// [1_u8, null]
/// []
/// null
/// ```
///
/// The elements would be converted to:
///
/// ```text
///     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐        ┌──┬──┐
///  1  │01│01│  2  │01│02│  3  │01│03│  1  │01│01│  null  │00│00│
///     └──┴──┘     └──┴──┘     └──┴──┘     └──┴──┘        └──┴──┘
///```
///
/// Which would be encoded as
///
/// ```text
///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
///  [1_u8, 2_u8, 3_u8]     │02│01│01│00│00│02│02│01│02│00│00│02│02│01│03│00│00│02│01│
///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
///                          └──── 1_u8 ────┘   └──── 2_u8 ────┘  └──── 3_u8 ────┘
///
///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
///  [1_u8, null]           │02│01│01│00│00│02│02│00│00│00│00│02│01│
///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
///                          └──── 1_u8 ────┘   └──── null ────┘
///
///```
///
/// With `[]` represented by an empty byte array, and `null` a null byte array.
///
/// # Ordering
///
/// ## Float Ordering
///
/// Floats are totally ordered in accordance to the `totalOrder` predicate as defined
/// in the IEEE 754 (2008 revision) floating point standard.
///
/// The ordering established by this does not always agree with the
/// [`PartialOrd`] and [`PartialEq`] implementations of `f32`. For example,
/// they consider negative and positive zero equal, while this does not
///
/// ## Null Ordering
///
/// The encoding described above will order nulls first, this can be inverted by representing
/// nulls as `0xFF_u8` instead of `0_u8`
///
/// ## Reverse Column Ordering
///
/// The order of a given column can be reversed by negating the encoded bytes of non-null values
///
/// [COBS]: https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing
/// [byte stuffing]: https://en.wikipedia.org/wiki/High-Level_Data_Link_Control#Asynchronous_framing
#[derive(Debug)]
pub struct RowConverter {
    fields: Arc<[SortField]>,
    /// State for codecs
    codecs: Vec<Codec>,
}

#[derive(Debug)]
enum Codec {
    /// No additional codec state is necessary
    Stateless,
    /// A row converter for the dictionary values
    /// and the encoding of a row containing only nulls
    Dictionary(RowConverter, OwnedRow),
    /// A row converter for the child fields
    /// and the encoding of a row containing only nulls
    Struct(RowConverter, OwnedRow),
    /// A row converter for the child field
    List(RowConverter),
}

impl Codec {
    fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
        match &sort_field.data_type {
            DataType::Dictionary(_, values) => {
                let sort_field =
                    SortField::new_with_options(values.as_ref().clone(), sort_field.options);

                let converter = RowConverter::new(vec![sort_field])?;
                let null_array = new_null_array(values.as_ref(), 1);
                let nulls = converter.convert_columns(&[null_array])?;

                let owned = OwnedRow {
                    data: nulls.buffer.into(),
                    config: nulls.config,
                };
                Ok(Self::Dictionary(converter, owned))
            }
            d if !d.is_nested() => Ok(Self::Stateless),
            DataType::List(f) | DataType::LargeList(f) => {
                // The encoded contents will be inverted if descending is set to true
                // As such we set `descending` to false and negate nulls first if it
                // it set to true
                let options = SortOptions {
                    descending: false,
                    nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
                };

                let field = SortField::new_with_options(f.data_type().clone(), options);
                let converter = RowConverter::new(vec![field])?;
                Ok(Self::List(converter))
            }
            DataType::Struct(f) => {
                let sort_fields = f
                    .iter()
                    .map(|x| SortField::new_with_options(x.data_type().clone(), sort_field.options))
                    .collect();

                let converter = RowConverter::new(sort_fields)?;
                let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect();

                let nulls = converter.convert_columns(&nulls)?;
                let owned = OwnedRow {
                    data: nulls.buffer.into(),
                    config: nulls.config,
                };

                Ok(Self::Struct(converter, owned))
            }
            _ => Err(ArrowError::NotYetImplemented(format!(
                "not yet implemented: {:?}",
                sort_field.data_type
            ))),
        }
    }

    fn encoder(&self, array: &dyn Array) -> Result<Encoder<'_>, ArrowError> {
        match self {
            Codec::Stateless => Ok(Encoder::Stateless),
            Codec::Dictionary(converter, nulls) => {
                let values = array.as_any_dictionary().values().clone();
                let rows = converter.convert_columns(&[values])?;
                Ok(Encoder::Dictionary(rows, nulls.row()))
            }
            Codec::Struct(converter, null) => {
                let v = as_struct_array(array);
                let rows = converter.convert_columns(v.columns())?;
                Ok(Encoder::Struct(rows, null.row()))
            }
            Codec::List(converter) => {
                let values = match array.data_type() {
                    DataType::List(_) => as_list_array(array).values(),
                    DataType::LargeList(_) => as_large_list_array(array).values(),
                    _ => unreachable!(),
                };
                let rows = converter.convert_columns(&[values.clone()])?;
                Ok(Encoder::List(rows))
            }
        }
    }

    fn size(&self) -> usize {
        match self {
            Codec::Stateless => 0,
            Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(),
            Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
            Codec::List(converter) => converter.size(),
        }
    }
}

#[derive(Debug)]
enum Encoder<'a> {
    /// No additional encoder state is necessary
    Stateless,
    /// The encoding of the child array and the encoding of a null row
    Dictionary(Rows, Row<'a>),
    /// The row encoding of the child arrays and the encoding of a null row
    ///
    /// It is necessary to encode to a temporary [`Rows`] to avoid serializing
    /// values that are masked by a null in the parent StructArray, otherwise
    /// this would establish an ordering between semantically null values
    Struct(Rows, Row<'a>),
    /// The row encoding of the child array
    List(Rows),
}

/// Configure the data type and sort order for a given column
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SortField {
    /// Sort options
    options: SortOptions,
    /// Data type
    data_type: DataType,
}

impl SortField {
    /// Create a new column with the given data type
    pub fn new(data_type: DataType) -> Self {
        Self::new_with_options(data_type, Default::default())
    }

    /// Create a new column with the given data type and [`SortOptions`]
    pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
        Self { options, data_type }
    }

    /// Return size of this instance in bytes.
    ///
    /// Includes the size of `Self`.
    pub fn size(&self) -> usize {
        self.data_type.size() + std::mem::size_of::<Self>() - std::mem::size_of::<DataType>()
    }
}

impl RowConverter {
    /// Create a new [`RowConverter`] with the provided schema
    pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
        if !Self::supports_fields(&fields) {
            return Err(ArrowError::NotYetImplemented(format!(
                "Row format support not yet implemented for: {fields:?}"
            )));
        }

        let codecs = fields.iter().map(Codec::new).collect::<Result<_, _>>()?;
        Ok(Self {
            fields: fields.into(),
            codecs,
        })
    }

    /// Check if the given fields are supported by the row format.
    pub fn supports_fields(fields: &[SortField]) -> bool {
        fields.iter().all(|x| Self::supports_datatype(&x.data_type))
    }

    fn supports_datatype(d: &DataType) -> bool {
        match d {
            _ if !d.is_nested() => true,
            DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => {
                Self::supports_datatype(f.data_type())
            }
            DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())),
            _ => false,
        }
    }

    /// Convert [`ArrayRef`] columns into [`Rows`]
    ///
    /// See [`Row`] for information on when [`Row`] can be compared
    ///
    /// # Panics
    ///
    /// Panics if the schema of `columns` does not match that provided to [`RowConverter::new`]
    pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
        let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
        let mut rows = self.empty_rows(num_rows, 0);
        self.append(&mut rows, columns)?;
        Ok(rows)
    }

    /// Convert [`ArrayRef`] columns appending to an existing [`Rows`]
    ///
    /// See [`Row`] for information on when [`Row`] can be compared
    ///
    /// # Panics
    ///
    /// Panics if
    /// * The schema of `columns` does not match that provided to [`RowConverter::new`]
    /// * The provided [`Rows`] were not created by this [`RowConverter`]
    ///
    /// ```
    /// # use std::sync::Arc;
    /// # use std::collections::HashSet;
    /// # use arrow_array::cast::AsArray;
    /// # use arrow_array::StringArray;
    /// # use arrow_row::{Row, RowConverter, SortField};
    /// # use arrow_schema::DataType;
    /// #
    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
    /// let a1 = StringArray::from(vec!["hello", "world"]);
    /// let a2 = StringArray::from(vec!["a", "a", "hello"]);
    ///
    /// let mut rows = converter.empty_rows(5, 128);
    /// converter.append(&mut rows, &[Arc::new(a1)]).unwrap();
    /// converter.append(&mut rows, &[Arc::new(a2)]).unwrap();
    ///
    /// let back = converter.convert_rows(&rows).unwrap();
    /// let values: Vec<_> = back[0].as_string::<i32>().iter().map(Option::unwrap).collect();
    /// assert_eq!(&values, &["hello", "world", "a", "a", "hello"]);
    /// ```
    pub fn append(&self, rows: &mut Rows, columns: &[ArrayRef]) -> Result<(), ArrowError> {
        assert!(
            Arc::ptr_eq(&rows.config.fields, &self.fields),
            "rows were not produced by this RowConverter"
        );

        if columns.len() != self.fields.len() {
            return Err(ArrowError::InvalidArgumentError(format!(
                "Incorrect number of arrays provided to RowConverter, expected {} got {}",
                self.fields.len(),
                columns.len()
            )));
        }

        let encoders = columns
            .iter()
            .zip(&self.codecs)
            .zip(self.fields.iter())
            .map(|((column, codec), field)| {
                if !column.data_type().equals_datatype(&field.data_type) {
                    return Err(ArrowError::InvalidArgumentError(format!(
                        "RowConverter column schema mismatch, expected {} got {}",
                        field.data_type,
                        column.data_type()
                    )));
                }
                codec.encoder(column.as_ref())
            })
            .collect::<Result<Vec<_>, _>>()?;

        let write_offset = rows.num_rows();
        let lengths = row_lengths(columns, &encoders);

        // We initialize the offsets shifted down by one row index.
        //
        // As the rows are appended to the offsets will be incremented to match
        //
        // For example, consider the case of 3 rows of length 3, 4, and 6 respectively.
        // The offsets would be initialized to `0, 0, 3, 7`
        //
        // Writing the first row entirely would yield `0, 3, 3, 7`
        // The second, `0, 3, 7, 7`
        // The third, `0, 3, 7, 13`
        //
        // This would be the final offsets for reading
        //
        // In this way offsets tracks the position during writing whilst eventually serving
        // as identifying the offsets of the written rows
        rows.offsets.reserve(lengths.len());
        let mut cur_offset = rows.offsets[write_offset];
        for l in lengths {
            rows.offsets.push(cur_offset);
            cur_offset = cur_offset.checked_add(l).expect("overflow");
        }

        // Note this will not zero out any trailing data in `rows.buffer`,
        // e.g. resulting from a call to `Rows::clear`, relying instead on the
        // encoders not assuming a zero-initialized buffer
        rows.buffer.resize(cur_offset, 0);

        for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) {
            // We encode a column at a time to minimise dispatch overheads
            encode_column(
                &mut rows.buffer,
                &mut rows.offsets[write_offset..],
                column.as_ref(),
                field.options,
                &encoder,
            )
        }

        if cfg!(debug_assertions) {
            assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len());
            rows.offsets
                .windows(2)
                .for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic"));
        }

        Ok(())
    }

    /// Convert [`Rows`] columns into [`ArrayRef`]
    ///
    /// # Panics
    ///
    /// Panics if the rows were not produced by this [`RowConverter`]
    pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>, ArrowError>
    where
        I: IntoIterator<Item = Row<'a>>,
    {
        let mut validate_utf8 = false;
        let mut rows: Vec<_> = rows
            .into_iter()
            .map(|row| {
                assert!(
                    Arc::ptr_eq(&row.config.fields, &self.fields),
                    "rows were not produced by this RowConverter"
                );
                validate_utf8 |= row.config.validate_utf8;
                row.data
            })
            .collect();

        // SAFETY
        // We have validated that the rows came from this [`RowConverter`]
        // and therefore must be valid
        unsafe { self.convert_raw(&mut rows, validate_utf8) }
    }

    /// Returns an empty [`Rows`] with capacity for `row_capacity` rows with
    /// a total length of `data_capacity`
    ///
    /// This can be used to buffer a selection of [`Row`]
    ///
    /// ```
    /// # use std::sync::Arc;
    /// # use std::collections::HashSet;
    /// # use arrow_array::cast::AsArray;
    /// # use arrow_array::StringArray;
    /// # use arrow_row::{Row, RowConverter, SortField};
    /// # use arrow_schema::DataType;
    /// #
    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
    ///
    /// // Convert to row format and deduplicate
    /// let converted = converter.convert_columns(&[Arc::new(array)]).unwrap();
    /// let mut distinct_rows = converter.empty_rows(3, 100);
    /// let mut dedup: HashSet<Row> = HashSet::with_capacity(3);
    /// converted.iter().filter(|row| dedup.insert(*row)).for_each(|row| distinct_rows.push(row));
    ///
    /// // Note: we could skip buffering and feed the filtered iterator directly
    /// // into convert_rows, this is done for demonstration purposes only
    /// let distinct = converter.convert_rows(&distinct_rows).unwrap();
    /// let values: Vec<_> = distinct[0].as_string::<i32>().iter().map(Option::unwrap).collect();
    /// assert_eq!(&values, &["hello", "world", "a"]);
    /// ```
    pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
        let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
        offsets.push(0);

        Rows {
            offsets,
            buffer: Vec::with_capacity(data_capacity),
            config: RowConfig {
                fields: self.fields.clone(),
                validate_utf8: false,
            },
        }
    }

    /// Convert raw bytes into [`ArrayRef`]
    ///
    /// # Safety
    ///
    /// `rows` must contain valid data for this [`RowConverter`]
    unsafe fn convert_raw(
        &self,
        rows: &mut [&[u8]],
        validate_utf8: bool,
    ) -> Result<Vec<ArrayRef>, ArrowError> {
        self.fields
            .iter()
            .zip(&self.codecs)
            .map(|(field, codec)| decode_column(field, rows, codec, validate_utf8))
            .collect()
    }

    /// Returns a [`RowParser`] that can be used to parse [`Row`] from bytes
    pub fn parser(&self) -> RowParser {
        RowParser::new(Arc::clone(&self.fields))
    }

    /// Returns the size of this instance in bytes
    ///
    /// Includes the size of `Self`.
    pub fn size(&self) -> usize {
        std::mem::size_of::<Self>()
            + self.fields.iter().map(|x| x.size()).sum::<usize>()
            + self.codecs.capacity() * std::mem::size_of::<Codec>()
            + self.codecs.iter().map(Codec::size).sum::<usize>()
    }
}

/// A [`RowParser`] can be created from a [`RowConverter`] and used to parse bytes to [`Row`]
#[derive(Debug)]
pub struct RowParser {
    config: RowConfig,
}

impl RowParser {
    fn new(fields: Arc<[SortField]>) -> Self {
        Self {
            config: RowConfig {
                fields,
                validate_utf8: true,
            },
        }
    }

    /// Creates a [`Row`] from the provided `bytes`.
    ///
    /// `bytes` must be a [`Row`] produced by the [`RowConverter`] associated with
    /// this [`RowParser`], otherwise subsequent operations with the produced [`Row`] may panic
    pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> {
        Row {
            data: bytes,
            config: &self.config,
        }
    }
}

/// The config of a given set of [`Row`]
#[derive(Debug, Clone)]
struct RowConfig {
    /// The schema for these rows
    fields: Arc<[SortField]>,
    /// Whether to run UTF-8 validation when converting to arrow arrays
    validate_utf8: bool,
}

/// A row-oriented representation of arrow data, that is normalized for comparison.
///
/// See the [module level documentation](self) and [`RowConverter`] for more details.
#[derive(Debug)]
pub struct Rows {
    /// Underlying row bytes
    buffer: Vec<u8>,
    /// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]`
    offsets: Vec<usize>,
    /// The config for these rows
    config: RowConfig,
}

impl Rows {
    /// Append a [`Row`] to this [`Rows`]
    pub fn push(&mut self, row: Row<'_>) {
        assert!(
            Arc::ptr_eq(&row.config.fields, &self.config.fields),
            "row was not produced by this RowConverter"
        );
        self.config.validate_utf8 |= row.config.validate_utf8;
        self.buffer.extend_from_slice(row.data);
        self.offsets.push(self.buffer.len())
    }

    /// Returns the row at index `row`
    pub fn row(&self, row: usize) -> Row<'_> {
        let end = self.offsets[row + 1];
        let start = self.offsets[row];
        Row {
            data: &self.buffer[start..end],
            config: &self.config,
        }
    }

    /// Sets the length of this [`Rows`] to 0
    pub fn clear(&mut self) {
        self.offsets.truncate(1);
        self.buffer.clear();
    }

    /// Returns the number of [`Row`] in this [`Rows`]
    pub fn num_rows(&self) -> usize {
        self.offsets.len() - 1
    }

    /// Returns an iterator over the [`Row`] in this [`Rows`]
    pub fn iter(&self) -> RowsIter<'_> {
        self.into_iter()
    }

    /// Returns the size of this instance in bytes
    ///
    /// Includes the size of `Self`.
    pub fn size(&self) -> usize {
        // Size of fields is accounted for as part of RowConverter
        std::mem::size_of::<Self>()
            + self.buffer.len()
            + self.offsets.len() * std::mem::size_of::<usize>()
    }
}

impl<'a> IntoIterator for &'a Rows {
    type Item = Row<'a>;
    type IntoIter = RowsIter<'a>;

    fn into_iter(self) -> Self::IntoIter {
        RowsIter {
            rows: self,
            start: 0,
            end: self.num_rows(),
        }
    }
}

/// An iterator over [`Rows`]
#[derive(Debug)]
pub struct RowsIter<'a> {
    rows: &'a Rows,
    start: usize,
    end: usize,
}

impl<'a> Iterator for RowsIter<'a> {
    type Item = Row<'a>;

    fn next(&mut self) -> Option<Self::Item> {
        if self.end == self.start {
            return None;
        }
        let row = self.rows.row(self.start);
        self.start += 1;
        Some(row)
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        let len = self.len();
        (len, Some(len))
    }
}

impl<'a> ExactSizeIterator for RowsIter<'a> {
    fn len(&self) -> usize {
        self.end - self.start
    }
}

impl<'a> DoubleEndedIterator for RowsIter<'a> {
    fn next_back(&mut self) -> Option<Self::Item> {
        if self.end == self.start {
            return None;
        }
        let row = self.rows.row(self.end);
        self.end -= 1;
        Some(row)
    }
}

/// A comparable representation of a row.
///
/// See the [module level documentation](self) for more details.
///
/// Two [`Row`] can only be compared if they both belong to [`Rows`]
/// returned by calls to [`RowConverter::convert_columns`] on the same
/// [`RowConverter`]. If different [`RowConverter`]s are used, any
/// ordering established by comparing the [`Row`] is arbitrary.
#[derive(Debug, Copy, Clone)]
pub struct Row<'a> {
    data: &'a [u8],
    config: &'a RowConfig,
}

impl<'a> Row<'a> {
    /// Create owned version of the row to detach it from the shared [`Rows`].
    pub fn owned(&self) -> OwnedRow {
        OwnedRow {
            data: self.data.into(),
            config: self.config.clone(),
        }
    }
}

// Manually derive these as don't wish to include `fields`

impl<'a> PartialEq for Row<'a> {
    #[inline]
    fn eq(&self, other: &Self) -> bool {
        self.data.eq(other.data)
    }
}

impl<'a> Eq for Row<'a> {}

impl<'a> PartialOrd for Row<'a> {
    #[inline]
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

impl<'a> Ord for Row<'a> {
    #[inline]
    fn cmp(&self, other: &Self) -> Ordering {
        self.data.cmp(other.data)
    }
}

impl<'a> Hash for Row<'a> {
    #[inline]
    fn hash<H: Hasher>(&self, state: &mut H) {
        self.data.hash(state)
    }
}

impl<'a> AsRef<[u8]> for Row<'a> {
    #[inline]
    fn as_ref(&self) -> &[u8] {
        self.data
    }
}

/// Owned version of a [`Row`] that can be moved/cloned freely.
///
/// This contains the data for the one specific row (not the entire buffer of all rows).
#[derive(Debug, Clone)]
pub struct OwnedRow {
    data: Box<[u8]>,
    config: RowConfig,
}

impl OwnedRow {
    /// Get borrowed [`Row`] from owned version.
    ///
    /// This is helpful if you want to compare an [`OwnedRow`] with a [`Row`].
    pub fn row(&self) -> Row<'_> {
        Row {
            data: &self.data,
            config: &self.config,
        }
    }
}

// Manually derive these as don't wish to include `fields`. Also we just want to use the same `Row` implementations here.

impl PartialEq for OwnedRow {
    #[inline]
    fn eq(&self, other: &Self) -> bool {
        self.row().eq(&other.row())
    }
}

impl Eq for OwnedRow {}

impl PartialOrd for OwnedRow {
    #[inline]
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

impl Ord for OwnedRow {
    #[inline]
    fn cmp(&self, other: &Self) -> Ordering {
        self.row().cmp(&other.row())
    }
}

impl Hash for OwnedRow {
    #[inline]
    fn hash<H: Hasher>(&self, state: &mut H) {
        self.row().hash(state)
    }
}

impl AsRef<[u8]> for OwnedRow {
    #[inline]
    fn as_ref(&self) -> &[u8] {
        &self.data
    }
}

/// Returns the null sentinel, negated if `invert` is true
#[inline]
fn null_sentinel(options: SortOptions) -> u8 {
    match options.nulls_first {
        true => 0,
        false => 0xFF,
    }
}

/// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`]
fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec<usize> {
    use fixed::FixedLengthEncoding;

    let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
    let mut lengths = vec![0; num_rows];

    for (array, encoder) in cols.iter().zip(encoders) {
        match encoder {
            Encoder::Stateless => {
                downcast_primitive_array! {
                    array => lengths.iter_mut().for_each(|x| *x += fixed::encoded_len(array)),
                    DataType::Null => {},
                    DataType::Boolean => lengths.iter_mut().for_each(|x| *x += bool::ENCODED_LEN),
                    DataType::Binary => as_generic_binary_array::<i32>(array)
                        .iter()
                        .zip(lengths.iter_mut())
                        .for_each(|(slice, length)| *length += variable::encoded_len(slice)),
                    DataType::LargeBinary => as_generic_binary_array::<i64>(array)
                        .iter()
                        .zip(lengths.iter_mut())
                        .for_each(|(slice, length)| *length += variable::encoded_len(slice)),
                    DataType::BinaryView => array.as_binary_view().iter().zip(lengths.iter_mut()).for_each(|(slice, length)| {
                            *length += variable::encoded_len(slice)
                        }),
                    DataType::Utf8 => array.as_string::<i32>()
                        .iter()
                        .zip(lengths.iter_mut())
                        .for_each(|(slice, length)| {
                            *length += variable::encoded_len(slice.map(|x| x.as_bytes()))
                        }),
                    DataType::LargeUtf8 => array.as_string::<i64>()
                        .iter()
                        .zip(lengths.iter_mut())
                        .for_each(|(slice, length)| {
                            *length += variable::encoded_len(slice.map(|x| x.as_bytes()))
                        }),
                    DataType::Utf8View => array.as_string_view().iter().zip(lengths.iter_mut()).for_each(|(slice, length)| {
                        *length += variable::encoded_len(slice.map(|x| x.as_bytes()))
                    }),
                    DataType::FixedSizeBinary(len) => {
                        let len = len.to_usize().unwrap();
                        lengths.iter_mut().for_each(|x| *x += 1 + len)
                    }
                    _ => unimplemented!("unsupported data type: {}", array.data_type()),
                }
            }
            Encoder::Dictionary(values, null) => {
                downcast_dictionary_array! {
                    array => {
                        for (v, length) in array.keys().iter().zip(lengths.iter_mut()) {
                            *length += match v {
                                Some(k) => values.row(k.as_usize()).data.len(),
                                None => null.data.len(),
                            }
                        }
                    }
                    _ => unreachable!(),
                }
            }
            Encoder::Struct(rows, null) => {
                let array = as_struct_array(array);
                lengths.iter_mut().enumerate().for_each(|(idx, length)| {
                    match array.is_valid(idx) {
                        true => *length += 1 + rows.row(idx).as_ref().len(),
                        false => *length += 1 + null.data.len(),
                    }
                });
            }
            Encoder::List(rows) => match array.data_type() {
                DataType::List(_) => {
                    list::compute_lengths(&mut lengths, rows, as_list_array(array))
                }
                DataType::LargeList(_) => {
                    list::compute_lengths(&mut lengths, rows, as_large_list_array(array))
                }
                _ => unreachable!(),
            },
        }
    }

    lengths
}

/// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses
fn encode_column(
    data: &mut [u8],
    offsets: &mut [usize],
    column: &dyn Array,
    opts: SortOptions,
    encoder: &Encoder<'_>,
) {
    match encoder {
        Encoder::Stateless => {
            downcast_primitive_array! {
                column => {
                    if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
                        fixed::encode(data, offsets, column.values(), nulls, opts)
                    } else {
                        fixed::encode_not_null(data, offsets, column.values(), opts)
                    }
                }
                DataType::Null => {}
                DataType::Boolean => {
                    if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
                        fixed::encode_boolean(data, offsets, column.as_boolean().values(), nulls, opts)
                    } else {
                        fixed::encode_boolean_not_null(data, offsets, column.as_boolean().values(), opts)
                    }
                }
                DataType::Binary => {
                    variable::encode(data, offsets, as_generic_binary_array::<i32>(column).iter(), opts)
                }
                DataType::BinaryView => {
                    variable::encode(data, offsets, column.as_binary_view().iter(), opts)
                }
                DataType::LargeBinary => {
                    variable::encode(data, offsets, as_generic_binary_array::<i64>(column).iter(), opts)
                }
                DataType::Utf8 => variable::encode(
                    data, offsets,
                    column.as_string::<i32>().iter().map(|x| x.map(|x| x.as_bytes())),
                    opts,
                ),
                DataType::LargeUtf8 => variable::encode(
                    data, offsets,
                    column.as_string::<i64>()
                        .iter()
                        .map(|x| x.map(|x| x.as_bytes())),
                    opts,
                ),
                DataType::Utf8View => variable::encode(
                    data, offsets,
                    column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())),
                    opts,
                ),
                DataType::FixedSizeBinary(_) => {
                    let array = column.as_any().downcast_ref().unwrap();
                    fixed::encode_fixed_size_binary(data, offsets, array, opts)
                }
                _ => unimplemented!("unsupported data type: {}", column.data_type()),
            }
        }
        Encoder::Dictionary(values, nulls) => {
            downcast_dictionary_array! {
                column => encode_dictionary_values(data, offsets, column, values, nulls),
                _ => unreachable!()
            }
        }
        Encoder::Struct(rows, null) => {
            let array = as_struct_array(column);
            let null_sentinel = null_sentinel(opts);
            offsets
                .iter_mut()
                .skip(1)
                .enumerate()
                .for_each(|(idx, offset)| {
                    let (row, sentinel) = match array.is_valid(idx) {
                        true => (rows.row(idx), 0x01),
                        false => (*null, null_sentinel),
                    };
                    let end_offset = *offset + 1 + row.as_ref().len();
                    data[*offset] = sentinel;
                    data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
                    *offset = end_offset;
                })
        }
        Encoder::List(rows) => match column.data_type() {
            DataType::List(_) => list::encode(data, offsets, rows, opts, as_list_array(column)),
            DataType::LargeList(_) => {
                list::encode(data, offsets, rows, opts, as_large_list_array(column))
            }
            _ => unreachable!(),
        },
    }
}

/// Encode dictionary values not preserving the dictionary encoding
pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
    data: &mut [u8],
    offsets: &mut [usize],
    column: &DictionaryArray<K>,
    values: &Rows,
    null: &Row<'_>,
) {
    for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
        let row = match k {
            Some(k) => values.row(k.as_usize()).data,
            None => null.data,
        };
        let end_offset = *offset + row.len();
        data[*offset..end_offset].copy_from_slice(row);
        *offset = end_offset;
    }
}

macro_rules! decode_primitive_helper {
    ($t:ty, $rows:ident, $data_type:ident, $options:ident) => {
        Arc::new(decode_primitive::<$t>($rows, $data_type, $options))
    };
}

/// Decodes a the provided `field` from `rows`
///
/// # Safety
///
/// Rows must contain valid data for the provided field
unsafe fn decode_column(
    field: &SortField,
    rows: &mut [&[u8]],
    codec: &Codec,
    validate_utf8: bool,
) -> Result<ArrayRef, ArrowError> {
    let options = field.options;

    let array: ArrayRef = match codec {
        Codec::Stateless => {
            let data_type = field.data_type.clone();
            downcast_primitive! {
                data_type => (decode_primitive_helper, rows, data_type, options),
                DataType::Null => Arc::new(NullArray::new(rows.len())),
                DataType::Boolean => Arc::new(decode_bool(rows, options)),
                DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)),
                DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)),
                DataType::BinaryView => Arc::new(decode_binary_view(rows, options)),
                DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)),
                DataType::Utf8 => Arc::new(decode_string::<i32>(rows, options, validate_utf8)),
                DataType::LargeUtf8 => Arc::new(decode_string::<i64>(rows, options, validate_utf8)),
                DataType::Utf8View => Arc::new(decode_string_view(rows, options, validate_utf8)),
                _ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {}", data_type)))
            }
        }
        Codec::Dictionary(converter, _) => {
            let cols = converter.convert_raw(rows, validate_utf8)?;
            cols.into_iter().next().unwrap()
        }
        Codec::Struct(converter, _) => {
            let (null_count, nulls) = fixed::decode_nulls(rows);
            rows.iter_mut().for_each(|row| *row = &row[1..]);
            let children = converter.convert_raw(rows, validate_utf8)?;

            let child_data = children.iter().map(|c| c.to_data()).collect();
            let builder = ArrayDataBuilder::new(field.data_type.clone())
                .len(rows.len())
                .null_count(null_count)
                .null_bit_buffer(Some(nulls))
                .child_data(child_data);

            Arc::new(StructArray::from(builder.build_unchecked()))
        }
        Codec::List(converter) => match &field.data_type {
            DataType::List(_) => {
                Arc::new(list::decode::<i32>(converter, rows, field, validate_utf8)?)
            }
            DataType::LargeList(_) => {
                Arc::new(list::decode::<i64>(converter, rows, field, validate_utf8)?)
            }
            _ => unreachable!(),
        },
    };
    Ok(array)
}

#[cfg(test)]
mod tests {
    use rand::distributions::uniform::SampleUniform;
    use rand::distributions::{Distribution, Standard};
    use rand::{thread_rng, Rng};

    use arrow_array::builder::*;
    use arrow_array::types::*;
    use arrow_array::*;
    use arrow_buffer::{i256, NullBuffer};
    use arrow_buffer::{Buffer, OffsetBuffer};
    use arrow_cast::display::{ArrayFormatter, FormatOptions};
    use arrow_ord::sort::{LexicographicalComparator, SortColumn};

    use super::*;

    #[test]
    fn test_fixed_width() {
        let cols = [
            Arc::new(Int16Array::from_iter([
                Some(1),
                Some(2),
                None,
                Some(-5),
                Some(2),
                Some(2),
                Some(0),
            ])) as ArrayRef,
            Arc::new(Float32Array::from_iter([
                Some(1.3),
                Some(2.5),
                None,
                Some(4.),
                Some(0.1),
                Some(-4.),
                Some(-0.),
            ])) as ArrayRef,
        ];

        let converter = RowConverter::new(vec![
            SortField::new(DataType::Int16),
            SortField::new(DataType::Float32),
        ])
        .unwrap();
        let rows = converter.convert_columns(&cols).unwrap();

        assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
        assert_eq!(
            rows.buffer,
            &[
                1, 128, 1, //
                1, 191, 166, 102, 102, //
                1, 128, 2, //
                1, 192, 32, 0, 0, //
                0, 0, 0, //
                0, 0, 0, 0, 0, //
                1, 127, 251, //
                1, 192, 128, 0, 0, //
                1, 128, 2, //
                1, 189, 204, 204, 205, //
                1, 128, 2, //
                1, 63, 127, 255, 255, //
                1, 128, 0, //
                1, 127, 255, 255, 255 //
            ]
        );

        assert!(rows.row(3) < rows.row(6));
        assert!(rows.row(0) < rows.row(1));
        assert!(rows.row(3) < rows.row(0));
        assert!(rows.row(4) < rows.row(1));
        assert!(rows.row(5) < rows.row(4));

        let back = converter.convert_rows(&rows).unwrap();
        for (expected, actual) in cols.iter().zip(&back) {
            assert_eq!(expected, actual);
        }
    }

    #[test]
    fn test_decimal128() {
        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128(
            DECIMAL128_MAX_PRECISION,
            7,
        ))])
        .unwrap();
        let col = Arc::new(
            Decimal128Array::from_iter([
                None,
                Some(i128::MIN),
                Some(-13),
                Some(46_i128),
                Some(5456_i128),
                Some(i128::MAX),
            ])
            .with_precision_and_scale(38, 7)
            .unwrap(),
        ) as ArrayRef;

        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
        for i in 0..rows.num_rows() - 1 {
            assert!(rows.row(i) < rows.row(i + 1));
        }

        let back = converter.convert_rows(&rows).unwrap();
        assert_eq!(back.len(), 1);
        assert_eq!(col.as_ref(), back[0].as_ref())
    }

    #[test]
    fn test_decimal256() {
        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256(
            DECIMAL256_MAX_PRECISION,
            7,
        ))])
        .unwrap();
        let col = Arc::new(
            Decimal256Array::from_iter([
                None,
                Some(i256::MIN),
                Some(i256::from_parts(0, -1)),
                Some(i256::from_parts(u128::MAX, -1)),
                Some(i256::from_parts(u128::MAX, 0)),
                Some(i256::from_parts(0, 46_i128)),
                Some(i256::from_parts(5, 46_i128)),
                Some(i256::MAX),
            ])
            .with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7)
            .unwrap(),
        ) as ArrayRef;

        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
        for i in 0..rows.num_rows() - 1 {
            assert!(rows.row(i) < rows.row(i + 1));
        }

        let back = converter.convert_rows(&rows).unwrap();
        assert_eq!(back.len(), 1);
        assert_eq!(col.as_ref(), back[0].as_ref())
    }

    #[test]
    fn test_bool() {
        let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap();

        let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) as ArrayRef;

        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
        assert!(rows.row(2) > rows.row(1));
        assert!(rows.row(2) > rows.row(0));
        assert!(rows.row(1) > rows.row(0));

        let cols = converter.convert_rows(&rows).unwrap();
        assert_eq!(&cols[0], &col);

        let converter = RowConverter::new(vec![SortField::new_with_options(
            DataType::Boolean,
            SortOptions {
                descending: true,
                nulls_first: false,
            },
        )])
        .unwrap();

        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
        assert!(rows.row(2) < rows.row(1));
        assert!(rows.row(2) < rows.row(0));
        assert!(rows.row(1) < rows.row(0));
        let cols = converter.convert_rows(&rows).unwrap();
        assert_eq!(&cols[0], &col);
    }

    #[test]
    fn test_timezone() {
        let a =
            TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("+01:00".to_string());
        let d = a.data_type().clone();

        let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
        let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
        let back = converter.convert_rows(&rows).unwrap();
        assert_eq!(back.len(), 1);
        assert_eq!(back[0].data_type(), &d);

        // Test dictionary
        let mut a = PrimitiveDictionaryBuilder::<Int32Type, TimestampNanosecondType>::new();
        a.append(34).unwrap();
        a.append_null();
        a.append(345).unwrap();

        // Construct dictionary with a timezone
        let dict = a.finish();
        let values = TimestampNanosecondArray::from(dict.values().to_data());
        let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00")));
        let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into()));
        let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone()));

        assert_eq!(dict_with_tz.data_type(), &d);
        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
        let rows = converter
            .convert_columns(&[Arc::new(dict_with_tz) as _])
            .unwrap();
        let back = converter.convert_rows(&rows).unwrap();
        assert_eq!(back.len(), 1);
        assert_eq!(back[0].data_type(), &v);
    }

    #[test]
    fn test_null_encoding() {
        let col = Arc::new(NullArray::new(10));
        let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap();
        let rows = converter.convert_columns(&[col]).unwrap();
        assert_eq!(rows.num_rows(), 10);
        assert_eq!(rows.row(1).data.len(), 0);
    }

    #[test]
    fn test_variable_width() {
        let col = Arc::new(StringArray::from_iter([
            Some("hello"),
            Some("he"),
            None,
            Some("foo"),
            Some(""),
        ])) as ArrayRef;

        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();

        assert!(rows.row(1) < rows.row(0));
        assert!(rows.row(2) < rows.row(4));
        assert!(rows.row(3) < rows.row(0));
        assert!(rows.row(3) < rows.row(1));

        let cols = converter.convert_rows(&rows).unwrap();
        assert_eq!(&cols[0], &col);

        let col = Arc::new(BinaryArray::from_iter([
            None,
            Some(vec![0_u8; 0]),
            Some(vec![0_u8; 6]),
            Some(vec![0_u8; variable::MINI_BLOCK_SIZE]),
            Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]),
            Some(vec![0_u8; variable::BLOCK_SIZE]),
            Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
            Some(vec![1_u8; 6]),
            Some(vec![1_u8; variable::MINI_BLOCK_SIZE]),
            Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]),
            Some(vec![1_u8; variable::BLOCK_SIZE]),
            Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
            Some(vec![0xFF_u8; 6]),
            Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]),
            Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]),
            Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
            Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
        ])) as ArrayRef;

        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();

        for i in 0..rows.num_rows() {
            for j in i + 1..rows.num_rows() {
                assert!(
                    rows.row(i) < rows.row(j),
                    "{} < {} - {:?} < {:?}",
                    i,
                    j,
                    rows.row(i),
                    rows.row(j)
                );
            }
        }

        let cols = converter.convert_rows(&rows).unwrap();
        assert_eq!(&cols[0], &col);

        let converter = RowConverter::new(vec![SortField::new_with_options(
            DataType::Binary,
            SortOptions {
                descending: true,
                nulls_first: false,
            },
        )])
        .unwrap();
        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();

        for i in 0..rows.num_rows() {
            for j in i + 1..rows.num_rows() {
                assert!(
                    rows.row(i) > rows.row(j),
                    "{} > {} - {:?} > {:?}",
                    i,
                    j,
                    rows.row(i),
                    rows.row(j)
                );
            }
        }

        let cols = converter.convert_rows(&rows).unwrap();
        assert_eq!(&cols[0], &col);
    }

    /// If `exact` is false performs a logical comparison between a and dictionary-encoded b
    fn dictionary_eq(a: &dyn Array, b: &dyn Array) {
        match b.data_type() {
            DataType::Dictionary(_, v) => {
                assert_eq!(a.data_type(), v.as_ref());
                let b = arrow_cast::cast(b, v).unwrap();
                assert_eq!(a, b.as_ref())
            }
            _ => assert_eq!(a, b),
        }
    }

    #[test]
    fn test_string_dictionary() {
        let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
            Some("foo"),
            Some("hello"),
            Some("he"),
            None,
            Some("hello"),
            Some(""),
            Some("hello"),
            Some("hello"),
        ])) as ArrayRef;

        let field = SortField::new(a.data_type().clone());
        let converter = RowConverter::new(vec![field]).unwrap();
        let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();

        assert!(rows_a.row(3) < rows_a.row(5));
        assert!(rows_a.row(2) < rows_a.row(1));
        assert!(rows_a.row(0) < rows_a.row(1));
        assert!(rows_a.row(3) < rows_a.row(0));

        assert_eq!(rows_a.row(1), rows_a.row(4));
        assert_eq!(rows_a.row(1), rows_a.row(6));
        assert_eq!(rows_a.row(1), rows_a.row(7));

        let cols = converter.convert_rows(&rows_a).unwrap();
        dictionary_eq(&cols[0], &a);

        let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
            Some("hello"),
            None,
            Some("cupcakes"),
        ])) as ArrayRef;

        let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap();
        assert_eq!(rows_a.row(1), rows_b.row(0));
        assert_eq!(rows_a.row(3), rows_b.row(1));
        assert!(rows_b.row(2) < rows_a.row(0));

        let cols = converter.convert_rows(&rows_b).unwrap();
        dictionary_eq(&cols[0], &b);

        let converter = RowConverter::new(vec![SortField::new_with_options(
            a.data_type().clone(),
            SortOptions {
                descending: true,
                nulls_first: false,
            },
        )])
        .unwrap();

        let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
        assert!(rows_c.row(3) > rows_c.row(5));
        assert!(rows_c.row(2) > rows_c.row(1));
        assert!(rows_c.row(0) > rows_c.row(1));
        assert!(rows_c.row(3) > rows_c.row(0));

        let cols = converter.convert_rows(&rows_c).unwrap();
        dictionary_eq(&cols[0], &a);

        let converter = RowConverter::new(vec![SortField::new_with_options(
            a.data_type().clone(),
            SortOptions {
                descending: true,
                nulls_first: true,
            },
        )])
        .unwrap();

        let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
        assert!(rows_c.row(3) < rows_c.row(5));
        assert!(rows_c.row(2) > rows_c.row(1));
        assert!(rows_c.row(0) > rows_c.row(1));
        assert!(rows_c.row(3) < rows_c.row(0));

        let cols = converter.convert_rows(&rows_c).unwrap();
        dictionary_eq(&cols[0], &a);
    }

    #[test]
    fn test_struct() {
        // Test basic
        let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef;
        let a_f = Arc::new(Field::new("int", DataType::Int32, false));
        let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef;
        let u_f = Arc::new(Field::new("s", DataType::Utf8, false));
        let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef;

        let sort_fields = vec![SortField::new(s1.data_type().clone())];
        let converter = RowConverter::new(sort_fields).unwrap();
        let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap();

        for (a, b) in r1.iter().zip(r1.iter().skip(1)) {
            assert!(a < b);
        }

        let back = converter.convert_rows(&r1).unwrap();
        assert_eq!(back.len(), 1);
        assert_eq!(&back[0], &s1);

        // Test struct nullability
        let data = s1
            .to_data()
            .into_builder()
            .null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010])))
            .null_count(2)
            .build()
            .unwrap();

        let s2 = Arc::new(StructArray::from(data)) as ArrayRef;
        let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap();
        assert_eq!(r2.row(0), r2.row(2)); // Nulls equal
        assert!(r2.row(0) < r2.row(1)); // Nulls first
        assert_ne!(r1.row(0), r2.row(0)); // Value does not equal null
        assert_eq!(r1.row(1), r2.row(1)); // Values equal

        let back = converter.convert_rows(&r2).unwrap();
        assert_eq!(back.len(), 1);
        assert_eq!(&back[0], &s2);

        back[0].to_data().validate_full().unwrap();
    }

    #[test]
    fn test_primitive_dictionary() {
        let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new();
        builder.append(2).unwrap();
        builder.append(3).unwrap();
        builder.append(0).unwrap();
        builder.append_null();
        builder.append(5).unwrap();
        builder.append(3).unwrap();
        builder.append(-1).unwrap();

        let a = builder.finish();
        let data_type = a.data_type().clone();
        let columns = [Arc::new(a) as ArrayRef];

        let field = SortField::new(data_type.clone());
        let converter = RowConverter::new(vec![field]).unwrap();
        let rows = converter.convert_columns(&columns).unwrap();
        assert!(rows.row(0) < rows.row(1));
        assert!(rows.row(2) < rows.row(0));
        assert!(rows.row(3) < rows.row(2));
        assert!(rows.row(6) < rows.row(2));
        assert!(rows.row(3) < rows.row(6));
    }

    #[test]
    fn test_dictionary_nulls() {
        let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data();
        let keys =
            Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]).into_data();

        let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
        let data = keys
            .into_builder()
            .data_type(data_type.clone())
            .child_data(vec![values])
            .build()
            .unwrap();

        let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef];
        let field = SortField::new(data_type.clone());
        let converter = RowConverter::new(vec![field]).unwrap();
        let rows = converter.convert_columns(&columns).unwrap();

        assert_eq!(rows.row(0), rows.row(1));
        assert_eq!(rows.row(3), rows.row(4));
        assert_eq!(rows.row(4), rows.row(5));
        assert!(rows.row(3) < rows.row(0));
    }

    #[test]
    #[should_panic(expected = "Encountered non UTF-8 data")]
    fn test_invalid_utf8() {
        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
        let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
        let rows = converter.convert_columns(&[array]).unwrap();
        let binary_row = rows.row(0);

        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
        let parser = converter.parser();
        let utf8_row = parser.parse(binary_row.as_ref());

        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
    }

    #[test]
    #[should_panic(expected = "rows were not produced by this RowConverter")]
    fn test_different_converter() {
        let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)]));
        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
        let rows = converter.convert_columns(&[values]).unwrap();

        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
        let _ = converter.convert_rows(&rows);
    }

    fn test_single_list<O: OffsetSizeTrait>() {
        let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new());
        builder.values().append_value(32);
        builder.values().append_value(52);
        builder.values().append_value(32);
        builder.append(true);
        builder.values().append_value(32);
        builder.values().append_value(52);
        builder.values().append_value(12);
        builder.append(true);
        builder.values().append_value(32);
        builder.values().append_value(52);
        builder.append(true);
        builder.values().append_value(32); // MASKED
        builder.values().append_value(52); // MASKED
        builder.append(false);
        builder.values().append_value(32);
        builder.values().append_null();
        builder.append(true);
        builder.append(true);

        let list = Arc::new(builder.finish()) as ArrayRef;
        let d = list.data_type().clone();

        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();

        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
        assert!(rows.row(2) < rows.row(1)); // [32, 42] < [32, 52, 12]
        assert!(rows.row(3) < rows.row(2)); // null < [32, 42]
        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 42]
        assert!(rows.row(5) < rows.row(2)); // [] < [32, 42]
        assert!(rows.row(3) < rows.row(5)); // null < []

        let back = converter.convert_rows(&rows).unwrap();
        assert_eq!(back.len(), 1);
        back[0].to_data().validate_full().unwrap();
        assert_eq!(&back[0], &list);

        let options = SortOptions {
            descending: false,
            nulls_first: false,
        };
        let field = SortField::new_with_options(d.clone(), options);
        let converter = RowConverter::new(vec![field]).unwrap();
        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();

        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
        assert!(rows.row(2) < rows.row(1)); // [32, 42] < [32, 52, 12]
        assert!(rows.row(3) > rows.row(2)); // null > [32, 42]
        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 42]
        assert!(rows.row(5) < rows.row(2)); // [] < [32, 42]
        assert!(rows.row(3) > rows.row(5)); // null > []

        let back = converter.convert_rows(&rows).unwrap();
        assert_eq!(back.len(), 1);
        back[0].to_data().validate_full().unwrap();
        assert_eq!(&back[0], &list);

        let options = SortOptions {
            descending: true,
            nulls_first: false,
        };
        let field = SortField::new_with_options(d.clone(), options);
        let converter = RowConverter::new(vec![field]).unwrap();
        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();

        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
        assert!(rows.row(2) > rows.row(1)); // [32, 42] > [32, 52, 12]
        assert!(rows.row(3) > rows.row(2)); // null > [32, 42]
        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 42]
        assert!(rows.row(5) > rows.row(2)); // [] > [32, 42]
        assert!(rows.row(3) > rows.row(5)); // null > []

        let back = converter.convert_rows(&rows).unwrap();
        assert_eq!(back.len(), 1);
        back[0].to_data().validate_full().unwrap();
        assert_eq!(&back[0], &list);

        let options = SortOptions {
            descending: true,
            nulls_first: true,
        };
        let field = SortField::new_with_options(d, options);
        let converter = RowConverter::new(vec![field]).unwrap();
        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();

        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
        assert!(rows.row(2) > rows.row(1)); // [32, 42] > [32, 52, 12]
        assert!(rows.row(3) < rows.row(2)); // null < [32, 42]
        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 42]
        assert!(rows.row(5) > rows.row(2)); // [] > [32, 42]
        assert!(rows.row(3) < rows.row(5)); // null < []

        let back = converter.convert_rows(&rows).unwrap();
        assert_eq!(back.len(), 1);
        back[0].to_data().validate_full().unwrap();
        assert_eq!(&back[0], &list);
    }

    fn test_nested_list<O: OffsetSizeTrait>() {
        let mut builder =
            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(Int32Builder::new()));

        builder.values().values().append_value(1);
        builder.values().values().append_value(2);
        builder.values().append(true);
        builder.values().values().append_value(1);
        builder.values().values().append_null();
        builder.values().append(true);
        builder.append(true);

        builder.values().values().append_value(1);
        builder.values().values().append_null();
        builder.values().append(true);
        builder.values().values().append_value(1);
        builder.values().values().append_null();
        builder.values().append(true);
        builder.append(true);

        builder.values().values().append_value(1);
        builder.values().values().append_null();
        builder.values().append(true);
        builder.values().append(false);
        builder.append(true);
        builder.append(false);

        builder.values().values().append_value(1);
        builder.values().values().append_value(2);
        builder.values().append(true);
        builder.append(true);

        let list = Arc::new(builder.finish()) as ArrayRef;
        let d = list.data_type().clone();

        // [
        //   [[1, 2], [1, null]],
        //   [[1, null], [1, null]],
        //   [[1, null], null]
        //   null
        //   [[1, 2]]
        // ]
        let options = SortOptions {
            descending: false,
            nulls_first: true,
        };
        let field = SortField::new_with_options(d.clone(), options);
        let converter = RowConverter::new(vec![field]).unwrap();
        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();

        assert!(rows.row(0) > rows.row(1));
        assert!(rows.row(1) > rows.row(2));
        assert!(rows.row(2) > rows.row(3));
        assert!(rows.row(4) < rows.row(0));
        assert!(rows.row(4) > rows.row(1));

        let back = converter.convert_rows(&rows).unwrap();
        assert_eq!(back.len(), 1);
        back[0].to_data().validate_full().unwrap();
        assert_eq!(&back[0], &list);

        let options = SortOptions {
            descending: true,
            nulls_first: true,
        };
        let field = SortField::new_with_options(d.clone(), options);
        let converter = RowConverter::new(vec![field]).unwrap();
        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();

        assert!(rows.row(0) > rows.row(1));
        assert!(rows.row(1) > rows.row(2));
        assert!(rows.row(2) > rows.row(3));
        assert!(rows.row(4) > rows.row(0));
        assert!(rows.row(4) > rows.row(1));

        let back = converter.convert_rows(&rows).unwrap();
        assert_eq!(back.len(), 1);
        back[0].to_data().validate_full().unwrap();
        assert_eq!(&back[0], &list);

        let options = SortOptions {
            descending: true,
            nulls_first: false,
        };
        let field = SortField::new_with_options(d, options);
        let converter = RowConverter::new(vec![field]).unwrap();
        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();

        assert!(rows.row(0) < rows.row(1));
        assert!(rows.row(1) < rows.row(2));
        assert!(rows.row(2) < rows.row(3));
        assert!(rows.row(4) > rows.row(0));
        assert!(rows.row(4) < rows.row(1));

        let back = converter.convert_rows(&rows).unwrap();
        assert_eq!(back.len(), 1);
        back[0].to_data().validate_full().unwrap();
        assert_eq!(&back[0], &list);
    }

    #[test]
    fn test_list() {
        test_single_list::<i32>();
        test_nested_list::<i32>();
    }

    #[test]
    fn test_large_list() {
        test_single_list::<i64>();
        test_nested_list::<i64>();
    }

    fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
    where
        K: ArrowPrimitiveType,
        Standard: Distribution<K::Native>,
    {
        let mut rng = thread_rng();
        (0..len)
            .map(|_| rng.gen_bool(valid_percent).then(|| rng.gen()))
            .collect()
    }

    fn generate_strings<O: OffsetSizeTrait>(
        len: usize,
        valid_percent: f64,
    ) -> GenericStringArray<O> {
        let mut rng = thread_rng();
        (0..len)
            .map(|_| {
                rng.gen_bool(valid_percent).then(|| {
                    let len = rng.gen_range(0..100);
                    let bytes = (0..len).map(|_| rng.gen_range(0..128)).collect();
                    String::from_utf8(bytes).unwrap()
                })
            })
            .collect()
    }

    fn generate_string_view(len: usize, valid_percent: f64) -> StringViewArray {
        let mut rng = thread_rng();
        (0..len)
            .map(|_| {
                rng.gen_bool(valid_percent).then(|| {
                    let len = rng.gen_range(0..100);
                    let bytes = (0..len).map(|_| rng.gen_range(0..128)).collect();
                    String::from_utf8(bytes).unwrap()
                })
            })
            .collect()
    }

    fn generate_byte_view(len: usize, valid_percent: f64) -> BinaryViewArray {
        let mut rng = thread_rng();
        (0..len)
            .map(|_| {
                rng.gen_bool(valid_percent).then(|| {
                    let len = rng.gen_range(0..100);
                    let bytes: Vec<_> = (0..len).map(|_| rng.gen_range(0..128)).collect();
                    bytes
                })
            })
            .collect()
    }

    fn generate_dictionary<K>(
        values: ArrayRef,
        len: usize,
        valid_percent: f64,
    ) -> DictionaryArray<K>
    where
        K: ArrowDictionaryKeyType,
        K::Native: SampleUniform,
    {
        let mut rng = thread_rng();
        let min_key = K::Native::from_usize(0).unwrap();
        let max_key = K::Native::from_usize(values.len()).unwrap();
        let keys: PrimitiveArray<K> = (0..len)
            .map(|_| {
                rng.gen_bool(valid_percent)
                    .then(|| rng.gen_range(min_key..max_key))
            })
            .collect();

        let data_type =
            DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(values.data_type().clone()));

        let data = keys
            .into_data()
            .into_builder()
            .data_type(data_type)
            .add_child_data(values.to_data())
            .build()
            .unwrap();

        DictionaryArray::from(data)
    }

    fn generate_fixed_size_binary(len: usize, valid_percent: f64) -> FixedSizeBinaryArray {
        let mut rng = thread_rng();
        let width = rng.gen_range(0..20);
        let mut builder = FixedSizeBinaryBuilder::new(width);

        let mut b = vec![0; width as usize];
        for _ in 0..len {
            match rng.gen_bool(valid_percent) {
                true => {
                    b.iter_mut().for_each(|x| *x = rng.gen());
                    builder.append_value(&b).unwrap();
                }
                false => builder.append_null(),
            }
        }

        builder.finish()
    }

    fn generate_struct(len: usize, valid_percent: f64) -> StructArray {
        let mut rng = thread_rng();
        let nulls = NullBuffer::from_iter((0..len).map(|_| rng.gen_bool(valid_percent)));
        let a = generate_primitive_array::<Int32Type>(len, valid_percent);
        let b = generate_strings::<i32>(len, valid_percent);
        let fields = Fields::from(vec![
            Field::new("a", DataType::Int32, true),
            Field::new("b", DataType::Utf8, true),
        ]);
        let values = vec![Arc::new(a) as _, Arc::new(b) as _];
        StructArray::new(fields, values, Some(nulls))
    }

    fn generate_list<F>(len: usize, valid_percent: f64, values: F) -> ListArray
    where
        F: FnOnce(usize) -> ArrayRef,
    {
        let mut rng = thread_rng();
        let offsets = OffsetBuffer::<i32>::from_lengths((0..len).map(|_| rng.gen_range(0..10)));
        let values_len = offsets.last().unwrap().to_usize().unwrap();
        let values = values(values_len);
        let nulls = NullBuffer::from_iter((0..len).map(|_| rng.gen_bool(valid_percent)));
        let field = Arc::new(Field::new("item", values.data_type().clone(), true));
        ListArray::new(field, offsets, values, Some(nulls))
    }

    fn generate_column(len: usize) -> ArrayRef {
        let mut rng = thread_rng();
        match rng.gen_range(0..16) {
            0 => Arc::new(generate_primitive_array::<Int32Type>(len, 0.8)),
            1 => Arc::new(generate_primitive_array::<UInt32Type>(len, 0.8)),
            2 => Arc::new(generate_primitive_array::<Int64Type>(len, 0.8)),
            3 => Arc::new(generate_primitive_array::<UInt64Type>(len, 0.8)),
            4 => Arc::new(generate_primitive_array::<Float32Type>(len, 0.8)),
            5 => Arc::new(generate_primitive_array::<Float64Type>(len, 0.8)),
            6 => Arc::new(generate_strings::<i32>(len, 0.8)),
            7 => Arc::new(generate_dictionary::<Int64Type>(
                // Cannot test dictionaries containing null values because of #2687
                Arc::new(generate_strings::<i32>(rng.gen_range(1..len), 1.0)),
                len,
                0.8,
            )),
            8 => Arc::new(generate_dictionary::<Int64Type>(
                // Cannot test dictionaries containing null values because of #2687
                Arc::new(generate_primitive_array::<Int64Type>(
                    rng.gen_range(1..len),
                    1.0,
                )),
                len,
                0.8,
            )),
            9 => Arc::new(generate_fixed_size_binary(len, 0.8)),
            10 => Arc::new(generate_struct(len, 0.8)),
            11 => Arc::new(generate_list(len, 0.8, |values_len| {
                Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
            })),
            12 => Arc::new(generate_list(len, 0.8, |values_len| {
                Arc::new(generate_strings::<i32>(values_len, 0.8))
            })),
            13 => Arc::new(generate_list(len, 0.8, |values_len| {
                Arc::new(generate_struct(values_len, 0.8))
            })),
            14 => Arc::new(generate_string_view(len, 0.8)),
            15 => Arc::new(generate_byte_view(len, 0.8)),
            _ => unreachable!(),
        }
    }

    fn print_row(cols: &[SortColumn], row: usize) -> String {
        let t: Vec<_> = cols
            .iter()
            .map(|x| match x.values.is_valid(row) {
                true => {
                    let opts = FormatOptions::default().with_null("NULL");
                    let formatter = ArrayFormatter::try_new(x.values.as_ref(), &opts).unwrap();
                    formatter.value(row).to_string()
                }
                false => "NULL".to_string(),
            })
            .collect();
        t.join(",")
    }

    fn print_col_types(cols: &[SortColumn]) -> String {
        let t: Vec<_> = cols
            .iter()
            .map(|x| x.values.data_type().to_string())
            .collect();
        t.join(",")
    }

    #[test]
    #[cfg_attr(miri, ignore)]
    fn fuzz_test() {
        for _ in 0..100 {
            let mut rng = thread_rng();
            let num_columns = rng.gen_range(1..5);
            let len = rng.gen_range(5..100);
            let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect();

            let options: Vec<_> = (0..num_columns)
                .map(|_| SortOptions {
                    descending: rng.gen_bool(0.5),
                    nulls_first: rng.gen_bool(0.5),
                })
                .collect();

            let sort_columns: Vec<_> = options
                .iter()
                .zip(&arrays)
                .map(|(o, c)| SortColumn {
                    values: Arc::clone(c),
                    options: Some(*o),
                })
                .collect();

            let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();

            let columns = options
                .into_iter()
                .zip(&arrays)
                .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
                .collect();

            let converter = RowConverter::new(columns).unwrap();
            let rows = converter.convert_columns(&arrays).unwrap();

            for i in 0..len {
                for j in 0..len {
                    let row_i = rows.row(i);
                    let row_j = rows.row(j);
                    let row_cmp = row_i.cmp(&row_j);
                    let lex_cmp = comparator.compare(i, j);
                    assert_eq!(
                        row_cmp,
                        lex_cmp,
                        "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}",
                        print_row(&sort_columns, i),
                        print_row(&sort_columns, j),
                        row_i,
                        row_j,
                        print_col_types(&sort_columns)
                    );
                }
            }

            let back = converter.convert_rows(&rows).unwrap();
            for (actual, expected) in back.iter().zip(&arrays) {
                actual.to_data().validate_full().unwrap();
                dictionary_eq(actual, expected)
            }
        }
    }

    #[test]
    fn test_clear() {
        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
        let mut rows = converter.empty_rows(3, 128);

        let first = Int32Array::from(vec![None, Some(2), Some(4)]);
        let second = Int32Array::from(vec![Some(2), None, Some(4)]);
        let arrays = [Arc::new(first) as ArrayRef, Arc::new(second) as ArrayRef];

        for array in arrays.iter() {
            rows.clear();
            converter.append(&mut rows, &[array.clone()]).unwrap();
            let back = converter.convert_rows(&rows).unwrap();
            assert_eq!(&back[0], array);
        }

        let mut rows_expected = converter.empty_rows(3, 128);
        converter.append(&mut rows_expected, &arrays[1..]).unwrap();

        for (i, (actual, expected)) in rows.iter().zip(rows_expected.iter()).enumerate() {
            assert_eq!(
                actual, expected,
                "For row {}: expected {:?}, actual: {:?}",
                i, expected, actual
            );
        }
    }

    #[test]
    fn test_append_codec_dictionary_binary() {
        use DataType::*;
        // Dictionary RowConverter
        let converter = RowConverter::new(vec![SortField::new(Dictionary(
            Box::new(Int32),
            Box::new(Binary),
        ))])
        .unwrap();
        let mut rows = converter.empty_rows(4, 128);

        let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
        let values = BinaryArray::from(vec![
            Some("a".as_bytes()),
            Some(b"b"),
            Some(b"c"),
            Some(b"d"),
        ]);
        let dict_array = DictionaryArray::new(keys, Arc::new(values));

        rows.clear();
        let array = Arc::new(dict_array) as ArrayRef;
        converter.append(&mut rows, &[array.clone()]).unwrap();
        let back = converter.convert_rows(&rows).unwrap();

        dictionary_eq(&back[0], &array);
    }

    #[test]
    fn test_list_prefix() {
        let mut a = ListBuilder::new(Int8Builder::new());
        a.append_value([None]);
        a.append_value([None, None]);
        let a = a.finish();

        let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
        let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
        assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
    }
}
